Download hourly EPA emissions

This notebook downloads zipped CEMS data from the EPA ftp server, extracts the data, corrects column names, combines the data, and exports one data file per year.


This code is the least complete out of all sections. The parts that identify new ftp files for download (using already_downloaded for existing files and ftp server upload dates) may require re-writing.

Because not all data needs to be re-downloaded for every update, check to make sure that the correct years are used in loops for downloading and processing data.

import io, time, json
import pandas as pd
import urllib#, urllib2
import re
import os
from os.path import join
import numpy as np
import ftplib
from ftplib import FTP
import timeit
import sys
from joblib import Parallel, delayed

cwd = os.getcwd()
data_path = join(cwd, '..', 'Data storage')

%load_ext watermark
%watermark -iv -v

json        2.0.9
pandas      0.21.1
re          2.2.1
numpy       1.13.3
CPython 3.6.3
IPython 6.2.1

# Load the "autoreload" extension
%load_ext autoreload

# always reload modules marked with "%aimport"
%autoreload 1

# add the 'src' directory as one where we can import modules
src_dir = join(os.getcwd(), os.pardir, 'src')

%aimport Data.data_extraction
from Data.data_extraction import import_clean_epa

Final script

This downloads hourly data from the ftp server over a range of years, and saves all of the file names/last update times in a list. The downloads can take some time depending on how much data is being retrieved.

Some of the code below assumes that we only need to retrieve new or modified files. If you are retrieving this data for the first time, create an empty dataframe named already_downloaded with column names file name and last updated.

# Replace the filename with whatever csv stores already downloaded file info
# path = join('EPA downloads', 'name_time 2015-2016.csv')
# already_downloaded = pd.read_csv(path, parse_dates=['last updated'])

# Uncomment the line below to create an empty dataframe
already_downloaded = pd.DataFrame(columns=['file name', 'last updated'])

file name last updated

# Years of data to download
years = [2017]

# Make the EPA download directory if it doesn't exist
    os.mkdir(join(data_path, 'EPA downloads'))

# Timestamp
start_time = timeit.default_timer()
name_time_list = []
# Open ftp connection and navigate to the correct folder
print('Opening ftp connection')
ftp = FTP('')

for year in years: 

    year_str = str(year)
    print('Change directory to {}'.format(year_str))
    except ftplib.all_errors as e:
    # Use ftplib to get the list of filenames
    print('Fetch filenames')
    fnames = ftp.nlst()
    # Create new directory path if it doesn't exist
    new_path = join(data_path, 'EPA downloads', year_str)
        print('New local folder created for {}'.format(year_str))
    # Look for files without _HLD in the name
    name_list = []
    time_list = []
    print('Find filenames without _HLD and time last updated')
    for name in fnames:
        if '_HLD' not in name:
                # The ftp command "MDTM" asks what time a file was last modified
                # It returns a code and the date/time
                # If the file name isn't already downloaded, or the time isn't the same
                tm = ftp.sendcmd('MDTM '+ name).split()[-1]
                dt = pd.to_datetime(tm[:8], format='%Y%m%d')
                if name not in already_downloaded['file name'].values:
                elif (already_downloaded
                      .loc[already_downloaded['file name']==name,
                           'last updated'].values[0] != tm):
                    tm = ftp.sendcmd('MDTM '+ name).split()[-1]
                    dt = pd.to_datetime(tm[:8], format='%Y%m%d')

            except ftplib.all_errors as e:
                # If ftp.sendcmd didn't work, assume the connection was lost
                ftp = FTP('')
#                 tm = ftp.sendcmd('MDTM '+ name)
#                 time_list.append(pd.to_datetime(tm.split()[-1]))
    # Store all filenames and update times
    print('Store names and update times')
    name_time_list.extend(zip(name_list, time_list))
    # Download and store data
    print('Downloading data')
    for name in name_list:
            with open(join(data_path, 'EPA downloads', year_str, name),
                      'wb') as f:
                ftp.retrbinary('RETR %s' % name, f.write)
        except ftplib.all_errors as e:
            except ftplib.all_errors as e:
            ftp = FTP('')
            with open(join(data_path, 'EPA downloads', year_str, name),
                      'wb') as f:
                ftp.retrbinary('RETR %s' % name, f.write)

    print('Download finished')
    print(round((timeit.default_timer() - start_time)/60.0,2), 'min so far')
    # Go back up a level on the ftp server
# Timestamp
elapsed = round((timeit.default_timer() - start_time)/60.0,2)

Opening ftp connection
Change directory to 2017
Fetch filenames
Find filenames without _HLD and time last updated
Store names and update times
Downloading data
[WinError 10054] An existing connection was forcibly closed by the remote host
[WinError 10054] An existing connection was forcibly closed by the remote host
[WinError 10060] A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond
[WinError 10060] A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond
Download finished
17.42 min so far

print 'Data download completed in %s mins' %(elapsed)

Data download completed in 42.2 mins

Export file names and update timestamp

This is just a sample file with 2017 filenames

name_time_df = pd.DataFrame(name_time_list, columns=['file name', 'last updated'])

file name last updated

path = join(data_path, 'EPA downloads', 'name_time example.csv')
name_time_df.to_csv(path, index=False)

Check number of columns and column names

The results below are from 2001-2016 data. They show how column names have changed over the years.

import csv
import zipfile
from io import BytesIO
from collections import Counter

This takes ~100 ms per file.

base_path = join(data_path, 'EPA downloads')
num_cols = {}
col_names = {}
for year in range(2001, 2018):

    n_cols_list = []
    col_name_list = []
    path = join(base_path, str(year))
    fnames = os.listdir(path)
    for name in fnames:
        fullpath = os.path.join(path, name)
        columns = pd.read_csv(fullpath, nrows=5).columns
        # Add the column names to the large list
        # Add the number of columns to the list
    col_names[year] = Counter(col_name_list)
    num_cols[year] = Counter(n_cols_list)

From the table below, recent years always have the units after an emission name. Before 2009 some files have the units and some don't. UNITID is consistent through all years, but UNIT_ID was added in after 2008 (not the same thing).

In [19]:

2001 2002 2003 2004 2005 2006 2007 2008 2009 2010 2011 2012 2013 2014 2015 2016 2017
CO2_MASS 8.0 8.0 32.0 38.0 106.0 163.0 143.0 89 NaN NaN NaN NaN NaN NaN NaN NaN NaN
CO2_MASS (tons) 188.0 188.0 164.0 158.0 90.0 33.0 53.0 107 588.0 588.0 588.0 588.0 588.0 588.0 591.0 588.0 588.0
CO2_MASS_MEASURE_FLG 196.0 196.0 196.0 196.0 196.0 196.0 196.0 196 588.0 588.0 588.0 588.0 588.0 588.0 591.0 588.0 588.0
CO2_RATE 8.0 8.0 32.0 38.0 106.0 163.0 143.0 89 NaN NaN NaN NaN NaN NaN NaN NaN NaN
CO2_RATE (tons/mmBtu) 188.0 188.0 164.0 158.0 90.0 33.0 53.0 107 588.0 588.0 588.0 588.0 588.0 588.0 591.0 588.0 588.0
CO2_RATE_MEASURE_FLG 196.0 196.0 196.0 196.0 196.0 196.0 196.0 196 588.0 588.0 588.0 588.0 588.0 588.0 591.0 588.0 588.0
FACILITY_NAME 196.0 196.0 196.0 196.0 196.0 196.0 196.0 196 588.0 588.0 588.0 588.0 588.0 588.0 591.0 588.0 588.0
FAC_ID NaN NaN NaN NaN NaN NaN NaN 98 588.0 588.0 588.0 588.0 588.0 588.0 591.0 588.0 588.0
GLOAD 8.0 8.0 32.0 38.0 106.0 163.0 143.0 89 NaN NaN NaN NaN NaN NaN NaN NaN NaN
GLOAD (MW) 188.0 188.0 164.0 158.0 90.0 33.0 53.0 107 588.0 588.0 588.0 588.0 588.0 588.0 591.0 588.0 588.0
HEAT_INPUT 8.0 8.0 32.0 38.0 106.0 163.0 143.0 89 NaN NaN NaN NaN NaN NaN NaN NaN NaN
HEAT_INPUT (mmBtu) 188.0 188.0 164.0 158.0 90.0 33.0 53.0 107 588.0 588.0 588.0 588.0 588.0 588.0 591.0 588.0 588.0
NOX_MASS 8.0 8.0 32.0 38.0 106.0 163.0 143.0 89 NaN NaN NaN NaN NaN NaN NaN NaN NaN
NOX_MASS (lbs) 188.0 188.0 164.0 158.0 90.0 33.0 53.0 107 588.0 588.0 588.0 588.0 588.0 588.0 591.0 588.0 588.0
NOX_MASS_MEASURE_FLG 196.0 196.0 196.0 196.0 196.0 196.0 196.0 196 588.0 588.0 588.0 588.0 588.0 588.0 591.0 588.0 588.0
NOX_RATE 8.0 8.0 32.0 38.0 106.0 163.0 143.0 89 NaN NaN NaN NaN NaN NaN NaN NaN NaN
NOX_RATE (lbs/mmBtu) 188.0 188.0 164.0 158.0 90.0 33.0 53.0 107 588.0 588.0 588.0 588.0 588.0 588.0 591.0 588.0 588.0
NOX_RATE_MEASURE_FLG 196.0 196.0 196.0 196.0 196.0 196.0 196.0 196 588.0 588.0 588.0 588.0 588.0 588.0 591.0 588.0 588.0
OP_DATE 196.0 196.0 196.0 196.0 196.0 196.0 196.0 196 588.0 588.0 588.0 588.0 588.0 588.0 591.0 588.0 588.0
OP_HOUR 196.0 196.0 196.0 196.0 196.0 196.0 196.0 196 588.0 588.0 588.0 588.0 588.0 588.0 591.0 588.0 588.0
OP_TIME 196.0 196.0 196.0 196.0 196.0 196.0 196.0 196 588.0 588.0 588.0 588.0 588.0 588.0 591.0 588.0 588.0
ORISPL_CODE 196.0 196.0 196.0 196.0 196.0 196.0 196.0 196 588.0 588.0 588.0 588.0 588.0 588.0 591.0 588.0 588.0
SLOAD 8.0 8.0 32.0 38.0 106.0 163.0 143.0 89 NaN NaN NaN NaN NaN NaN NaN NaN NaN
SLOAD (1000 lbs) 164.0 159.0 134.0 126.0 60.0 1.0 10.0 54 363.0 NaN NaN NaN NaN NaN NaN NaN NaN
SLOAD (1000lb/hr) 24.0 29.0 30.0 32.0 30.0 32.0 43.0 53 225.0 588.0 588.0 588.0 588.0 588.0 591.0 588.0 588.0
SO2_MASS 8.0 8.0 32.0 38.0 106.0 163.0 143.0 89 NaN NaN NaN NaN NaN NaN NaN NaN NaN
SO2_MASS (lbs) 188.0 188.0 164.0 158.0 90.0 33.0 53.0 107 588.0 588.0 588.0 588.0 588.0 588.0 591.0 588.0 588.0
SO2_MASS_MEASURE_FLG 196.0 196.0 196.0 196.0 196.0 196.0 196.0 196 588.0 588.0 588.0 588.0 588.0 588.0 591.0 588.0 588.0
SO2_RATE 8.0 8.0 32.0 38.0 106.0 163.0 143.0 89 NaN NaN NaN NaN NaN NaN NaN NaN NaN
SO2_RATE (lbs/mmBtu) 188.0 188.0 164.0 158.0 90.0 33.0 53.0 107 588.0 588.0 588.0 588.0 588.0 588.0 591.0 588.0 588.0
SO2_RATE_MEASURE_FLG 196.0 196.0 196.0 196.0 196.0 196.0 196.0 196 588.0 588.0 588.0 588.0 588.0 588.0 591.0 588.0 588.0
STATE 196.0 196.0 196.0 196.0 196.0 196.0 196.0 196 588.0 588.0 588.0 588.0 588.0 588.0 591.0 588.0 588.0
UNITID 196.0 196.0 196.0 196.0 196.0 196.0 196.0 196 588.0 588.0 588.0 588.0 588.0 588.0 591.0 588.0 588.0
UNIT_ID NaN NaN NaN NaN NaN NaN NaN 98 588.0 588.0 588.0 588.0 588.0 588.0 591.0 588.0 588.0

Index([u'CO2_MASS', u'CO2_MASS (tons)', u'CO2_MASS_MEASURE_FLG', u'CO2_RATE',
       u'CO2_RATE (tons/mmBtu)', u'CO2_RATE_MEASURE_FLG', u'FACILITY_NAME',
       u'FAC_ID', u'GLOAD', u'GLOAD (MW)', u'HEAT_INPUT',
       u'HEAT_INPUT (mmBtu)', u'NOX_MASS', u'NOX_MASS (lbs)',
       u'NOX_MASS_MEASURE_FLG', u'NOX_RATE', u'NOX_RATE (lbs/mmBtu)',
       u'ORISPL_CODE', u'SLOAD', u'SLOAD (1000 lbs)', u'SLOAD (1000lb/hr)',
       u'SO2_MASS', u'SO2_MASS (lbs)', u'SO2_MASS_MEASURE_FLG', u'SO2_RATE',
       u'SO2_RATE (lbs/mmBtu)', u'SO2_RATE_MEASURE_FLG', u'STATE', u'UNITID',

2001 2002 2003 2004 2005 2006 2007 2008 2009 2010 2011 2012 2013 2014 2015 2016
22 196.0 196.0 196.0 196.0 196.0 196.0 196.0 98 NaN NaN NaN NaN NaN NaN NaN NaN
24 NaN NaN NaN NaN NaN NaN NaN 98 588.0 588.0 588.0 588.0 588.0 588.0 591.0 441.0

Correct column names and export all files (1 file per year)

Also convert the OP_DATE column to a datetime object

Using joblib for this.

Joblib on windows requires the if name == 'main': statement. And in a Jupyter notebook the function needs to be imported from an external script. I probably should have done the parallel part at a higher level - the longest part is saving the csv files. Could use this method - disable a check - to speed up the process.

Joblib has to be at least version 10.0, which is only available through pip - got some errors when using the version installed by conda.

Create a dictionary mapping column names. Any values on the left (keys) should be replaced by values on the right (values).

col_name_map = {'CO2_MASS' : 'CO2_MASS (tons)',
                'CO2_RATE' : 'CO2_RATE (tons/mmBtu)',
                'GLOAD' : 'GLOAD (MW)',
                'HEAT_INPUT' : 'HEAT_INPUT (mmBtu)',
                'NOX_MASS' : 'NOX_MASS (lbs)',
                'NOX_RATE' : 'NOX_RATE (lbs/mmBtu)',
                'SLOAD' : 'SLOAD (1000lb/hr)',
                'SLOAD (1000 lbs)' : 'SLOAD (1000lb/hr)',
                'SO2_MASS' : 'SO2_MASS (lbs)',
                'SO2_RATE' : 'SO2_RATE (lbs/mmBtu)'

Be sure to change the start and end years

years = [2016, 2017]

if __name__ == '__main__':
    start_time = timeit.default_timer()
    base_path = join(data_path, 'EPA downloads')
    for year in years:
        print('Starting', str(year))
        df_list = []
        path = join(base_path, str(year))
        fnames = os.listdir(path)

        df_list = Parallel(n_jobs=-1)(delayed(import_clean_epa)
                                      (path, name, col_name_map) 
                                      for name in fnames)

        print('Combining data')
        df = pd.concat(df_list)
        print('Saving file')
        path_out = join(data_path, 'EPA emissions',
                        'EPA emissions {}.csv'.format(str(year)))
        df.to_csv(path_out, index=False)
        # File IO is much faster with the feather file format
#         df.to_feather(path_out)

        print(round((timeit.default_timer() - start_time)/60.0,2), 'min so far')
    # Timestamp
    elapsed = round((timeit.default_timer() - start_time)/60.0,2)

Starting 2016
Combining data
Saving file
16.46 min so far
Starting 2017
Combining data
Saving file
31.72 min so far

